package com.ouyunc.mq.config.kafka;


import com.ouyunc.mq.config.kafka.builder.impl.KafkaTemplateBuilder;
import com.ouyunc.mq.config.kafka.properties.CommonProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author fangzhenxun
 * @Description kafka 的操作模板配置类
 * @Date 2020/3/13 10:58
 **/
@Slf4j
@Configuration
public class KafkaMqConfig {

    /**
     * kafka模板建造者
     */
    @Autowired
    private KafkaTemplateBuilder kafkaTemplateBuilder;

    /**
     * kafka 公共属行
     */
    @Autowired
    private CommonProperties commonProperties;

    /**
     * @Author fangzhenxun
     * @Description  kafka模板创建
     * @Date 2020/3/13 11:02
     * @param
     * @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object>
     **/
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return kafkaTemplateBuilder.build();
    }

    /**
     * @Author fangzhenxun
     * @Description 定义一个KafkaAdmin的bean，可以自动检测集群中是否存在topic，不存在则创建
     * @Date 2020/3/16 11:55
     * @param
     * @return org.springframework.kafka.core.KafkaAdmin
     **/
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>(1);
        //配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commonProperties.getBootstrapServers());
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    /**
     * @Author fangzhenxun
     * @Description  kafka 的管理员,可以用来动态创建topic
     * @Date 2020/3/16 12:24
     * @param kafkaAdmin
     * @return org.apache.kafka.clients.admin.AdminClient
     **/
    @Bean
    public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
        Map<String, Object> props = new HashMap<>(1);
        //配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAdmin.getConfigurationProperties().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
        return AdminClient.create(props);
    }
}
